package com.song.bigdata.v2;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;

import java.util.concurrent.TimeUnit;

/**
 * 时间窗口
 * 监控nc
 * 每分钟内的事件（nc发送数据）总数
 */
public class TimeWindow {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
        //         3, // 尝试重启的次数
        //         org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // 间隔
        // ));
        DataStream<String> text = env.socketTextStream("localhost", 9999);

        text
            .map(value -> 1) // 每个事件计为1
            .timeWindowAll(Time.minutes(1)) // 设置时间窗口为1分钟
            .sum(0) // 计算每分钟内的事件总数
            .print();

        env.execute("Time Window Example");
    }
}
